package com;

import com.google.gson.Gson;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaTest2 {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest2.class);

    private static Gson gson = new Gson();
    private static Properties PROPS = new Properties();

    private static String topics;
    private static Producer<String, byte[]> producer;


    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    public static final String ACKS_CONFIG = "acks";
    public static final String RETRIES_CONFIG = "retries";
    public static final String LINGER_MS_CONFIG = "linger.ms";
    public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
    public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
    public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
    public static final String BATCH_SIZE_CONFIG = "batch.size";
    public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
    public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
    public static final String COMPRESSION_TYPE = "compression.type";


    public static void initProducer() {
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PROPS.get(BOOTSTRAP_SERVERS_CONFIG));
        config.put(ProducerConfig.ACKS_CONFIG, PROPS.get(ACKS_CONFIG));
        config.put(ProducerConfig.RETRIES_CONFIG, PROPS.get(RETRIES_CONFIG));
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, PROPS.get(BATCH_SIZE_CONFIG));
        config.put(ProducerConfig.LINGER_MS_CONFIG, PROPS.get(LINGER_MS_CONFIG));
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, PROPS.get(BUFFER_MEMORY_CONFIG));
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PROPS.get(KEY_SERIALIZER_CLASS_CONFIG));
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, PROPS.get(VALUE_SERIALIZER_CLASS_CONFIG));
        config.put(ProducerConfig.SEND_BUFFER_CONFIG, PROPS.get(SEND_BUFFER_CONFIG));
        config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, PROPS.get(MAX_REQUEST_SIZE_CONFIG));
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, PROPS.get(COMPRESSION_TYPE));
        config.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-" + System.currentTimeMillis());
        producer = new KafkaProducer<>(config);
    }

    public static void loadConfig() throws Exception {
        String confPath = ResourceUtils.getAbsolutePath("producer.properties");
        try {
            LOGGER.info("配置文件=》" + confPath);
            if (!confPath.endsWith(File.separator)) {
                confPath = confPath + File.separator;
            }
            PROPS.load(new FileInputStream(confPath));

            topics = PROPS.getProperty("topic").trim();
        } catch (IOException e) {
            LOGGER.error("配置文件加载路径不正确!" + confPath, e);
            throw new Exception("配置文件加载路径不正确!" + confPath, e);
        }
    }

    public static boolean sendMsg(String topic, String key, HashMap<String,Double> msg) throws Exception {
        boolean isDone = false;
        if (msg!=null) {
            byte[] bytes = SerializationUtils.serialize(msg);
            ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, bytes);
            Future<RecordMetadata> fu = producer.send(record);
            try {
                RecordMetadata rm = fu.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return isDone;
    }


//    public static void main2(String[] args) throws Exception {
//        loadConfig();
//        initProducer();
//        long start = System.currentTimeMillis();
//        Object[] obj = new Object[60001];
//        for (int i = 0; i < 15000; i++) {
//            obj[0] = "p"+i;
//            for (int j = 1; j < 60001; j++) {
//                obj[j] = j;
//            }
//            sendMsg(topics, "p"+i, obj);
//            if(i%1000==0){
//                System.out.println(i + "=>耗时：" + (System.currentTimeMillis() - start));
//            }
//        }
//
//    }


    public static void main(String[] args) throws Exception {
        loadConfig();
        initProducer();
        long start = System.currentTimeMillis();
        HashMap<String,Double> map = new HashMap<>();
        for (int i = 0; i < 15000; i++) {
            map.clear();
            long s1 = System.currentTimeMillis();
            for (int j = 1; j < 60001; j++) {
                map.put((start+1)+":"+j,start+1*1d);
            }
//            System.out.println(i+"=>pro耗时："+(System.currentTimeMillis() - start));
            sendMsg(topics, "p"+i, map);
            if(i%1000==0){
                System.out.println(i + "=>耗时：" + (System.currentTimeMillis() - start));
            }
        }

    }
}

